/*-------------------------------------------------------------------------
 *
 * shard_rebalancer.h
 *
 * Type and function declarations for the shard rebalancer tool.
 *
 * Copyright (c) 2016, Citus Data, Inc.
 *
 * $Id$
 *
 *-------------------------------------------------------------------------
 */

#ifndef SHARD_REBALANCER_H
#define SHARD_REBALANCER_H

#include "postgres.h"

#include "fmgr.h"

#include "nodes/pg_list.h"

#include "distributed/coordinator_protocol.h"
#include "distributed/worker_manager.h"

/* Limits for function parameters */
#define SHARD_REPLICATION_FACTOR_MINIMUM 1
#define SHARD_REPLICATION_FACTOR_MAXIMUM 100

/* Definitions for metadata update commands */
#define INSERT_SHARD_PLACEMENT_COMMAND                                                 \
    "INSERT INTO pg_dist_shard_placement VALUES(" UINT64_FORMAT ", %d, " UINT64_FORMAT \
    ", '%s', %d)"
#define DELETE_SHARD_PLACEMENT_COMMAND           \
    "DELETE FROM pg_dist_shard_placement WHERE " \
    "shardid=" UINT64_FORMAT " AND nodename='%s' AND nodeport=%d"

/*
 * Definitions for shard placement json field names. These names should match
 * the column names in pg_dist_shard_placement.
 */
#define FIELD_NAME_SHARD_ID "shardid"
#define FIELD_NAME_SHARD_LENGTH "shardlength"
#define FIELD_NAME_SHARD_STATE "shardstate"
#define FIELD_NAME_NODE_NAME "nodename"
#define FIELD_NAME_NODE_PORT "nodeport"
#define FIELD_NAME_PLACEMENT_ID "placementid"

/*
 * Definitions for worker node json field names. These names should match the
 * column names in master_get_active_worker_nodes().
 */
#define FIELD_NAME_WORKER_NAME "node_name"
#define FIELD_NAME_WORKER_PORT "node_port"

/* Definitions for placement update json field names */
#define FIELD_NAME_UPDATE_TYPE "updatetype"
#define FIELD_NAME_SOURCE_NAME "sourcename"
#define FIELD_NAME_SOURCE_PORT "sourceport"
#define FIELD_NAME_TARGET_NAME "targetname"
#define FIELD_NAME_TARGET_PORT "targetport"

/* *INDENT-OFF* */
/* Definition for format of placement update json document */
#define PLACEMENT_UPDATE_JSON_FORMAT                 \
    "{"                                              \
    "\"" FIELD_NAME_UPDATE_TYPE "\":%d,"             \
    "\"" FIELD_NAME_SHARD_ID "\":" UINT64_FORMAT "," \
    "\"" FIELD_NAME_SOURCE_NAME "\":%s,"             \
    "\"" FIELD_NAME_SOURCE_PORT "\":%d,"             \
    "\"" FIELD_NAME_TARGET_NAME "\":%s,"             \
    "\"" FIELD_NAME_TARGET_PORT "\":%d"              \
    "}"

/* *INDENT-ON* */

#define REBALANCE_ACTIVITY_MAGIC_NUMBER 1337
#define REBALANCE_PROGRESS_WAITING 0
#define REBALANCE_PROGRESS_MOVING 1
#define REBALANCE_PROGRESS_MOVED 2

/* Enumeration that defines different placement update types */
typedef enum {
    PLACEMENT_UPDATE_INVALID_FIRST = 0,
    PLACEMENT_UPDATE_MOVE = 1,
    PLACEMENT_UPDATE_COPY = 2
} PlacementUpdateType;

typedef enum {
    PLACEMENT_UPDATE_STATUS_NOT_STARTED_YET = 0,
    PLACEMENT_UPDATE_STATUS_SETTING_UP = 1,
    PLACEMENT_UPDATE_STATUS_COPYING_DATA = 2,
    PLACEMENT_UPDATE_STATUS_CATCHING_UP = 3,
    PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS = 4,
    PLACEMENT_UPDATE_STATUS_FINAL_CATCH_UP = 5,
    PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS = 6,
    PLACEMENT_UPDATE_STATUS_COMPLETING = 7,
    PLACEMENT_UPDATE_STATUS_COMPLETED = 8,
} PlacementUpdateStatus;

/*
 * PlacementUpdateEvent represents a logical unit of work that copies or
 * moves a shard placement.
 */
typedef struct PlacementUpdateEvent {
    PlacementUpdateType updateType;
    uint64 shardId;
    WorkerNode* sourceNode;
    WorkerNode* targetNode;
} PlacementUpdateEvent;

typedef struct PlacementUpdateEventProgress {
    uint64 shardId;
    char sourceName[255];
    int sourcePort;
    char targetName[255];
    int targetPort;
    PlacementUpdateType updateType;
    pg_atomic_uint64 progress;
    pg_atomic_uint64 updateStatus;
} PlacementUpdateEventProgress;

typedef struct NodeFillState {
    WorkerNode* node;

    /*
     * capacity is how big this node is, relative to the other nodes in the
     * cluster. This has no unit, it can represent whatever the user wants.
     * Some examples:
     * 1. GBs of RAM
     * 2. number of CPUs
     * 3. GBs of disk
     * 4. relative improvement of new CPU generation in newly added nodes
     */
    float4 capacity;

    /*
     * totalCost is the costs of ShardCosts on the node added together. This
     * doesn't have a unit. See the ShardCost->cost comment for some examples.
     */
    float4 totalCost;

    /*
     * utilization is how "full" the node is. This is always totalCost divided
     * by capacity. Since neither of those have a unit, this also doesn't have
     * one.
     */
    float4 utilization;

    /*
     * shardCostListDesc contains all ShardCosts that are on the current node,
     * ordered from high cost to low cost.
     */
    List* shardCostListDesc;
} NodeFillState;

typedef struct ShardCost {
    uint64 shardId;

    /*
     * cost is the cost of the shard. This doesn't have a unit.
     * Some examples of what this could represent:
     * 1. GBs of data
     * 2. number of queries per day
     */
    float4 cost;
} ShardCost;

typedef struct DisallowedPlacement {
    ShardCost* shardCost;
    NodeFillState* fillState;
} DisallowedPlacement;

typedef struct RebalancePlanFunctions {
    bool (*shardAllowedOnNode)(uint64 shardId, WorkerNode* workerNode, void* context);
    float4 (*nodeCapacity)(WorkerNode* workerNode, void* context);
    ShardCost (*shardCost)(uint64 shardId, void* context);
    void* context;
} RebalancePlanFunctions;

/* External function declarations */
extern "C" Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS);
extern "C" Datum shard_placement_replication_array(PG_FUNCTION_ARGS);
extern "C" Datum worker_node_responsive(PG_FUNCTION_ARGS);
extern "C" Datum update_shard_placement(PG_FUNCTION_ARGS);
extern "C" Datum init_rebalance_monitor(PG_FUNCTION_ARGS);
extern "C" Datum finalize_rebalance_monitor(PG_FUNCTION_ARGS);
extern "C" Datum get_rebalance_progress(PG_FUNCTION_ARGS);

extern List* RebalancePlacementUpdates(List* workerNodeList, List* shardPlacementListList,
                                       double threshold, int32 maxShardMoves,
                                       bool drainOnly,
                                       float4 utilizationImproventThreshold,
                                       RebalancePlanFunctions* rebalancePlanFunctions);
extern List* ReplicationPlacementUpdates(List* workerNodeList, List* shardPlacementList,
                                         int shardReplicationFactor);
extern void ExecuteRebalancerCommandInSeparateTransaction(char* command);
extern void AcquirePlacementColocationLock(Oid relationId, int lockMode,
                                           const char* operationName);

extern void SetupRebalanceMonitor(List* placementUpdateList, Oid relationId,
                                  uint64 initialProgressState,
                                  PlacementUpdateStatus initialStatus);

#endif /* SHARD_REBALANCER_H */
